package com.csw.android.androidtest.module.rxjava;

import android.os.Bundle;
import android.view.View;

import com.csw.android.androidtest.R;
import com.csw.android.androidtest.view.LogView;
import com.csw.android.dev_utils.ui.BaseFragment;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

/**
 * RxJava基础测试
 * <br/>
 * RxJava的作用是将嵌套调用的任务以链式编程的方式来写代码，减少代码嵌套层次。
 * 并实现了各环节之间的线程切换功能，使得在多线程中处理任务变得简单
 */
public class RxjavaBaseTest extends BaseFragment {
    private LogView logView;

    @Override
    public int getContentViewID() {
        return R.layout.fragment_rx_java_base_test;
    }

    @Override
    public void initView(@NotNull View rootView, @Nullable Bundle savedInstanceState) {
        super.initView(rootView, savedInstanceState);
        logView = rootView.findViewById(R.id.logView);
    }

    private Disposable disposable;

    /**
     * RxJava的实现是通过它定义的各种操作符进行链式调用，每个操作符都是对上个Observable的包装，用包装的方式解除代码嵌套
     * 如下，编码过程中是先写执行任务，然后用操作符进行链式编码对任务前置或者后置步骤进行处理，代码执行按箭头所示，
     * 由最外层的操作符10向内部包含的操作符9订阅，最终订阅到任务执行处，这里可以称之为订阅阶段，任务执行过程中发出各种事件
     * 主要有onNext onComplete onError，事件经过每层包装的操作符往最外层传递
     * <p>
     * 任务
     * ↑        ↓ 操作符1
     * ↑        ↓ 操作符2
     * ↑        ↓ 操作符3
     * ↑        ↓ 操作符4
     * ↑        ↓ 操作符5
     * ↑        ↓ 操作符6
     * ↑        ↓ 操作符7
     * ↑        ↓ 操作符8
     * ↑        ↓ 操作符9
     * ↑        ↓ 操作符10
     * <p>
     * Observable.create用于创建任务，subscribe用于订阅并执行任务。
     * 整个过程可以通过操作符subscribeOn和observeOn进行线程切换。
     * 如下，是demo的线程切换过程
     * <p>
     * create(休眠一秒)
     * ↑ (切换线程)         ↓ subscribeOn(io)
     * ↑                   ↓ (切换线程)observeOn(computation)
     * ↑                   ↓ (通知过程执行)doOnNext
     * ↑                   ↓ (切换线程)observeOn(io)
     * ↑                   ↓ (通知过程执行)doOnError
     * ↑                   ↓ (切换线程)observeOn(newThread)
     * ↑                   ↓ (通知过程执行)doOnComplete
     * ↑                   ↓ (切换线程)observeOn(main)
     * ↑ (订阅过程执行)     ↓ doOnSubscribe2
     * ↑ (切换线程)         ↓ subscribeOn(computation)
     * ↑ (订阅过程执行)     ↓ doOnSubscribe
     * 订阅               通知（onNext,onComplete,onError）
     * <p>
     * doOnSubscribe的实现是，在执行订阅时，向观察者发出onSubscribe事件，然后切换到指定线程，再向内层的
     * Observable执行订阅观察者操作，这样后续代码即可在指定线程执行。
     * <p>
     * observeOn的实现是，接收到的通知事件存储起来，在指定线程启动后开始循环处理事件，后续通知即可在指定线程执行
     * <p>
     * 另外还有map，flatMap等各种操作符，还有zip，delay，empty，just等各种快速创建任务的方法，感兴趣可以阅
     * 读源码了解。
     * RxJava因为是基于观察者模式，它也提供了类似于EventBus那种事件总线的库，用于全局监听事件,实现模块解耦，
     * 以其明朗的链式调用，线程切换，使得大部分事务处理代码的可读性提升很大，能这么流行确实本身也强大。
     */
    @Override
    public void initData() {
        super.initData();
        logView.clearLog();
        logView.addLog(generateLog("initData"));
        //RxJava是基于观察者模式，创建观察者，任务执行的每个阶段可以向观察者发出事件，他会去告知订阅者
        disposable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                //执行任务
                logView.addLog(generateLog("exec"));
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    if (!emitter.isDisposed()) {
                        //发出onError，用于任务失败时捕获异常发送事件
                        emitter.onError(e);
                    }
                }
                //发送事件前，需要先判断是否取消了订阅，如果是，可以结束任务
                if (!emitter.isDisposed()) {
                    //发出onNext，这里通常用于传值，可以调用多次
                    emitter.onNext("onNext");
                }
                if (!emitter.isDisposed()) {
                    //发出onComplete
                    emitter.onComplete();
                }
            }
        })
                //指定订阅过程的任务线程
                //Schedulers基于Java thread实现的线程调度，这里io使用普通优先级的线程
                //另有computation表示高优先级的线程，用于处理计算密集型的任务
                //newThread则是在新线程中执行
                .subscribeOn(Schedulers.io())
                //指定后续通知在computation线程中执行
                .observeOn(Schedulers.computation())
                //doOnNext用于通知过程中接收onNext事件
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        logView.addLog(generateLog("doOnNext"));
                    }
                })
                //指定后续通知在io线程中执行
                .observeOn(Schedulers.io())
                //doOnError用于通知过程中接收onError事件
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        logView.addLog(generateLog("doOnError"));
                    }
                })
                //指定后续通知在io线程中执行
                .observeOn(Schedulers.newThread())
                //doOnComplete用于通知过程中接收onComplete事件
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        logView.addLog(generateLog("doOnComplete"));
                    }
                })
                //这里是指定后续通知过程的任务线程
                //AndroidSchedulers基于Android main thread与handler实现的线程调度
                .observeOn(AndroidSchedulers.mainThread())
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        logView.addLog(generateLog("doOnSubscribe2"));
                    }
                })
                //指定后续订阅过程在computation中处理
                .subscribeOn(Schedulers.computation())
                //doOnSubscribe用于订阅过程中，任务执行之前处理事情，比如可以显示一个对话框
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        logView.addLog(generateLog("doOnSubscribe"));
                    }
                })
                //订阅并执行任务
                .subscribe(
                        new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                logView.addLog(generateLog(s));
                            }
                        },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                logView.addLog(generateLog("onError"));
                            }
                        },
                        new Action() {
                            @Override
                            public void run() throws Exception {
                                logView.addLog(generateLog("onComplete"));
                            }
                        }
                );
    }

    private String generateLog(String msg) {
        StringBuilder sb = new StringBuilder();
        if (msg != null) {
            sb.append(msg);
        }
        sb.append(" runOn_>");
        sb.append(Thread.currentThread().getName());
        return sb.toString();
    }

    @Override
    public void onDestroyView() {
        //若任务还未结束，则取消订阅
        if (!disposable.isDisposed()) {
            disposable.dispose();
        }
        logView = null;
        super.onDestroyView();
    }
}
